package com.atguigu.chapter11;

import com.atguigu.bean.WaterSensor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/7/24 10:03
 */
public class Flink08_Time_ProcessingTime {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 20000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);
        
        DataStreamSource<WaterSensor> stream = env.fromElements(
            new WaterSensor("sensor_1", 1L, 10),
            new WaterSensor("sensor_1", 2L, 20),
            new WaterSensor("sensor_2", 3L, 30),
            new WaterSensor("sensor_1", 4L, 40),
            new WaterSensor("sensor_1", 5L, 50)
        );
        
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
        
        // 需要在schema的最后添加一个新的字段用来表示处理时间
        Table table = tenv.fromDataStream(stream, $("id"), $("ts"), $("vc"), $("pt").proctime());
        table.execute().print();
    
    }
}
